package cn.rdtimes.wolfdsp.sdk;


import cn.rdtimes.wolfdsp.sdk.util.ResponseUtil;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;

/**
 * 任务服务器,将创建的Task放到服务中进行调度执行
 *
 * @author BZ
 */
public class TaskService {

    private static TaskService instance = null;

    /**
     * 任务完成监听器
     */
    private final TaskCompletedListener completedListener = new DefaultTaskCompletedListener();

    /**
     * key = jobId + taskId ; value = ScheduleTask对象
     */
    private final Map<String, Task> schedulingTaskMap = new ConcurrentHashMap<>(32);

    private int threadCount;
    private ExecutorService executorService;
    private TaskFactory taskFactory;

    public TaskService(TaskFactory taskFactory, int threadCount) {
        init(taskFactory, threadCount);
    }

    private void init(TaskFactory taskFactory, int threadCount) {
        if (instance != null) {
            throw new IllegalStateException("ScheduleService instance is singleton");
        }
        instance = this;

        this.taskFactory = taskFactory;
        this.threadCount = threadCount;
        this.executorService = Executors.newFixedThreadPool(threadCount);

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                executorService.shutdown();
                try {
                    executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public static TaskService getInstance() {
        return instance;
    }

    public int getThreadCount() {
        return threadCount;
    }

    final TaskCompletedListener getCompletedListener() {
        return completedListener;
    }

    final Map<String, Object> addScheduleTask(TaskStartMeta meta) throws Exception {
        TaskState state = TaskState.RUNNING;
        int progress;
        Map<String, Object> resp = new HashMap<>();

        Task task = getScheduleTask(meta.getJobId(), meta.getTaskId());
        if (task != null) {// 存在就获取其状态及进度
            state = task.getState();
            progress = task.incAndGetProgress(0);
            ResponseUtil.buildCommRsp(resp, meta.getJobId(), meta.getTaskId(), state, System.currentTimeMillis(),
                    progress, task.getExceptionInfo(),
                    (state.getValue() > TaskState.SKIP.getValue() ? task.getOutputParams() : null));
            completedListener.onCompleted(meta.getJobId(), meta.getTaskId(), state, null);
        } else {
            task = taskFactory.getTask(meta);
            if (task == null) {
                throw new Exception("create task is error");
            }
            schedulingTaskMap.put(getKey(meta.getJobId(), meta.getTaskId()), task);
            progress = task.incAndGetProgress(10);
            ResponseUtil.buildCommRsp(resp, meta.getJobId(), meta.getTaskId(), state, System.currentTimeMillis(),
                    progress, null, null);
            Future<?> future = executorService.submit(task);
            task.setFuture(future);
        }
        return resp;
    }

    final Map<String, Object> removeScheduleTask(String jobId, String taskId, long timestamp) throws Exception {
        TaskState state = TaskState.KILLED;
        int progress = 100;
        Map<String, Object> outputParams = null;
        Map<String, Object> resp = new HashMap<>();

        Task task = schedulingTaskMap.remove(getKey(jobId, taskId));
        if (task != null) { //存在就获取其状态及进度
            state = task.getState();
            boolean isFinished = state.getValue() > TaskState.SKIP.getValue();
            if (isFinished) {
                outputParams = task.getOutputParams();
            } else {
                state = TaskState.KILLED;
                Future<?> future = task.getFuture();
                if (future != null) {
                    future.cancel(true);
                }
            }
        }

        //状态都是完成或异常或停止
        ResponseUtil.buildCommRsp(resp, jobId, taskId, state, System.currentTimeMillis(),
                progress, (task != null ? task.getExceptionInfo() : null), outputParams);
        return resp;
    }

    final Map<String, Object> queryScheduleTask(String jobId, String taskId, long timestamp) {
        TaskState state = TaskState.FINISHED;
        int progress = 100;
        Map<String, Object> outputParams = null;
        Map<String, Object> resp = new HashMap<>();

        Task task = getScheduleTask(jobId, taskId);
        if (task != null) { // 存在就获取其状态及进度
            state = task.getState();
            progress = task.incAndGetProgress(0);
            outputParams = state.getValue() > TaskState.SKIP.getValue() ? task.getOutputParams() : null;
        }

        ResponseUtil.buildCommRsp(resp, jobId, taskId, state, System.currentTimeMillis(),
                progress, (task != null ? task.getExceptionInfo() : null), (task != null), outputParams);
        return resp;
    }

    public Task getScheduleTask(String jobId, String taskId) {
        return schedulingTaskMap.get(getKey(jobId, taskId));
    }

    public Set<Map.Entry<String, Task>> getSchedulingTasks() {
        return schedulingTaskMap.entrySet();
    }

    private String getKey(String jobId, String taskId) {
        return jobId + taskId;
    }

    /**
     * 任务完成时调用的监听器
     */
    interface TaskCompletedListener {

        /**
         * 任务完成或异常时调用
         *
         * @param jobId 任务流程id
         * @param taskId 任务id
         * @param state 任务状态
         * @param ex 异常
         */
        void onCompleted(String jobId, String taskId, TaskState state, Throwable ex);
    }

    private class DefaultTaskCompletedListener implements TaskCompletedListener {

        public void onCompleted(String jobId, String taskId, TaskState state, Throwable ex) {
            if (state.getValue() > TaskState.SKIP.getValue()) {
                schedulingTaskMap.remove(getKey(jobId, taskId));
            }
        }
    }

}
